---
toc_max_heading_level: 4
sidebar_label: Rust
title: TDengine Rust Connector
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

import Preparation from "./_preparation.mdx"
import RustInsert from "../07-develop/03-insert-data/_rust_sql.mdx"
import RustBind from "../07-develop/03-insert-data/_rust_stmt.mdx"
import RustSml from "../07-develop/03-insert-data/_rust_schemaless.mdx"
import RustQuery from "../07-develop/04-query-data/_rust.mdx"

[![Crates.io](https://img.shields.io/crates/v/taos)](https://crates.io/crates/taos) ![Crates.io](https://img.shields.io/crates/d/taos) [![docs.rs](https://img.shields.io/docsrs/taos)](https://docs.rs/taos)

`taos` 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。

`taos` 提供两种建立连接的方式。一种是**原生连接**，它通过 TDengine 客户端驱动程序（taosc）连接 TDengine 运行实例。另外一种是 **Websocket 连接**，它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。你可以通过不同的 “特性（即 Cargo 关键字 `features`）” 来指定使用哪种连接器（默认同时支持）。Websocket 连接支持任何平台，原生连接支持所有 TDengine 客户端能运行的平台。

该 Rust 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-connector-rust)。

## 支持的平台

原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。
Websocket 连接支持所有能运行 Rust 的平台。

## 版本历史

|   Rust 连接器版本   |   TDengine 版本   |                        主要功能                       |
| :----------------: | :--------------: | :--------------------------------------------------: |
|       v0.9.2       |      3.0.7.0 or later    | STMT：ws 下获取 tag_fields、col_fields。 |
|       v0.8.12       |      3.0.5.0    | 消息订阅：获取消费进度及按照指定进度开始消费。 |
|       v0.8.0       |      3.0.4.0     | 支持无模式写入。 |
|       v0.7.6       |      3.0.3.0     | 支持在请求中使用 req_id。 |
|       v0.6.0       |      3.0.0.0     | 基础功能。 |

Rust 连接器仍然在快速开发中，1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine，以避免已知问题。

## 处理错误

在报错后，可以获取到错误的具体信息：

```rust
match conn.exec(sql) {
    Ok(_) => {
        Ok(())
    }
    Err(e) => {
        eprintln!("ERROR: {:?}", e);
        Err(e)
    }
}
```

## TDengine DataType 和 Rust DataType

TDengine 目前支持时间戳、数字、字符、布尔类型，与 Rust 对应类型转换如下：

| TDengine DataType | Rust DataType     |
| ----------------- | ----------------- |
| TIMESTAMP         | Timestamp         |
| INT               | i32               |
| BIGINT            | i64               |
| FLOAT             | f32               |
| DOUBLE            | f64               |
| SMALLINT          | i16               |
| TINYINT           | i8                |
| BOOL              | bool              |
| BINARY            | Vec<u8\>          |
| NCHAR             | String            |
| JSON              | serde_json::Value |

**注意**：JSON 类型仅在 tag 中支持。

## 安装步骤

### 安装前准备

* 安装 Rust 开发工具链
* 如果使用原生连接，请安装 TDengine 客户端驱动，具体步骤请参考[安装客户端驱动](../#安装客户端驱动)

### 安装连接器

根据选择的连接方式，按照如下说明在 [Rust](https://rust-lang.org) 项目中添加 [taos][taos] 依赖：

<Tabs defaultValue="default">
<TabItem value="default" label="同时支持">

在 `Cargo.toml` 文件中添加 [taos][taos]：

```toml
[dependencies]
# use default feature
taos = "*"
```

</TabItem>

<TabItem value="rest" label="仅 Websocket">

在 `Cargo.toml` 文件中添加 [taos][taos]，并启用 `ws` 特性。

```toml
[dependencies]
taos = { version = "*", default-features = false, features = ["ws"] }
```

当仅启用 `ws` 特性时，可同时指定 `r2d2` 使得在同步（blocking/sync）模式下使用 [r2d2] 作为连接池：

```toml
[dependencies]
taos = { version = "*", default-features = false, features = ["r2d2", "ws"] }
```

</TabItem>

<TabItem value="native" label="仅原生连接">

在 `Cargo.toml` 文件中添加 [taos][taos]，并启用 `native` 特性：

```toml
[dependencies]
taos = { version = "*", default-features = false, features = ["native"] }
```

</TabItem>
</Tabs>

## 建立连接

[TaosBuilder] 通过 DSN 连接描述字符串创建一个连接构造器。

```rust
let builder = TaosBuilder::from_dsn("taos://")?;
```

现在您可以使用该对象创建连接：

```rust
let conn = builder.build()?;
```

连接对象可以创建多个：

```rust
let conn1 = builder.build()?;
let conn2 = builder.build()?;
```

DSN 描述字符串基本结构如下：

```text
<driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
|------|------------|---|-----------|-----------|------|------|------------|-----------------------|
|driver|   protocol |   | username  | password  | host | port |  database  |  params               |
```

各部分意义见下表：

- **driver**: 必须指定驱动名以便连接器选择何种方式创建连接，支持如下驱动名：
  - **taos**: 表名使用 TDengine 连接器驱动。
  - **tmq**: 使用 TMQ 订阅数据。
  - **http/ws**: 使用 Websocket 创建连接。
  - **https/wss**: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
- **protocol**: 显示指定以何种方式建立连接，例如：`taos+ws://localhost:6041` 指定以 Websocket 方式建立连接。
- **username/password**: 用于创建连接的用户名及密码。
- **host/port**: 指定创建连接的服务器及端口，当不指定服务器地址及端口时（`taos://`），原生连接默认为 `localhost:6030`，Websocket 连接默认为 `localhost:6041` 。
- **database**: 指定默认连接的数据库名，可选参数。
- **params**：其他可选参数。

一个完整的 DSN 描述字符串示例如下：

```text
taos+ws://localhost:6041/test
```

表示使用 Websocket（`ws`）方式通过 `6041` 端口连接服务器 `localhost`，并指定默认数据库为 `test`。

这使得用户可以通过 DSN 指定连接方式：

```rust
use taos::*;

// use native protocol.
let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
let conn1 = builder.build();

//  use websocket protocol.
let builder2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
let conn2 = builder2.build();
```

建立连接后，您可以进行相关数据库操作：

```rust
async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
    // prepare database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS `{db}`"),
        format!("CREATE DATABASE `{db}`"),
        format!("USE `{db}`"),
    ])
    .await?;

    let inserted = taos.exec_many([
        // create super table
        "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
         TAGS (`groupid` INT, `location` BINARY(24))",
        // create child table
        "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
        // insert into child table
        "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
        // insert with NULL values
        "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
        // insert and automatically create table with tags if not exists
        "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
        // insert many records in a single sql
        "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
    ]).await?;

    assert_eq!(inserted, 6);
    let mut result = taos.query("select * from `meters`").await?;

    for field in result.fields() {
        println!("got field: {}", field.name());
    }

    let values = result.
}
```

查询数据可以通过两种方式：使用内建类型或 [serde](https://serde.rs) 序列化框架。

```rust
    // Query option 1, use rows stream.
    let mut rows = result.rows();
    while let Some(row) = rows.try_next().await? {
        for (name, value) in row {
            println!("got value of {}: {}", name, value);
        }
    }

    // Query options 2, use deserialization with serde.
    #[derive(Debug, serde::Deserialize)]
    #[allow(dead_code)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
    }

    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;

    dbg!(records);
    Ok(())
```

## 使用示例

### 创建数据库和表

```rust
use taos::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let dsn = "taos://localhost:6030";
    let builder = TaosBuilder::from_dsn(dsn)?;

    let taos = builder.build()?;

    let db = "query";

    // create database
    taos.exec_many([
        format!("DROP DATABASE IF EXISTS `{db}`"),
        format!("CREATE DATABASE `{db}`"),
        format!("USE `{db}`"),
    ])
    .await?;

    // create table
    taos.exec_many([
        // create super table
        "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
         TAGS (`groupid` INT, `location` BINARY(16))",
        // create child table
        "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')",
    ]).await?;
}
```

> **注意**：如果不使用 `use db` 指定数据库，则后续对表的操作都需要增加数据库名称作为前缀，如 db.tb。

### 插入数据

<RustInsert />

### 查询数据

<RustQuery />

### 执行带有 req_id 的 SQL

此 req_id 可用于请求链路追踪。

```rust
let rs = taos.query_with_req_id("select * from stable where tag1 is null", 1)?;
```

### 通过参数绑定写入数据

TDengine 的 Rust 连接器实现了参数绑定方式对数据写入（INSERT）场景的支持。采用这种方式写入数据时，能避免 SQL 语法解析的资源消耗，从而在很多情况下显著提升写入性能。

参数绑定接口详见[API参考](#stmt-api)

<RustBind />

### 无模式写入

TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议（Line Protocol）、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../reference/schemaless/)。

<RustSml />

### 执行带有 req_id 的无模式写入

此 req_id 可用于请求链路追踪。

```rust
let sml_data = SmlDataBuilder::default()
    .protocol(SchemalessProtocol::Line)
    .data(data)
    .req_id(100u64)
    .build()?;

client.put(&sml_data)?
```

### 数据订阅

TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。

#### 创建 Topic

```rust
taos.exec_many([
    // create topic for subscription
    format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}")
])
.await?;
```

#### 创建 Consumer

从 DSN 开始，构建一个 TMQ 连接器。

```rust
let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;
```

创建消费者：

```rust
let mut consumer = tmq.build()?;
```

#### 订阅消费数据

消费者可订阅一个或多个 `TOPIC`。

```rust
consumer.subscribe(["tmq_meters"]).await?;
```

TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futures/stream/index.html) 类型，可以使用相应 API 对每个消息进行消费，并通过 `.commit` 进行已消费标记。

```rust
{
    let mut stream = consumer.stream();

    while let Some((offset, message)) = stream.try_next().await? {
        // get information from offset

        // the topic
        let topic = offset.topic();
        // the vgroup id, like partition id in kafka.
        let vgroup_id = offset.vgroup_id();
        println!("* in vgroup id {vgroup_id} of topic {topic}\n");

        if let Some(data) = message.into_data() {
            while let Some(block) = data.fetch_raw_block().await? {
                // one block for one table, get table name if needed
                let name = block.table_name();
                let records: Vec<Record> = block.deserialize().try_collect()?;
                println!(
                    "** table: {}, got {} records: {:#?}\n",
                    name.unwrap(),
                    records.len(),
                    records
                );
            }
        }
        consumer.commit(offset).await?;
    }
}
```

获取消费进度：

版本要求 connector-rust >= v0.8.8， TDengine >= 3.0.5.0

```rust
let assignments = consumer.assignments().await.unwrap();
```

#### 指定订阅 Offset

按照指定的进度消费：

版本要求 connector-rust >= v0.8.8， TDengine >= 3.0.5.0

```rust
consumer.offset_seek(topic, vgroup_id, offset).await;
```

#### 关闭订阅

```rust
consumer.unsubscribe().await;
```

对于 TMQ DSN, 有以下配置项可以进行设置，需要注意的是，`group.id` 是必须的。

- `group.id`: 同一个消费者组，将以至少消费一次的方式进行消息负载均衡。
- `client.id`: 可选的订阅客户端识别项。
- `auto.offset.reset`: 可选初始化订阅起点， *earliest* 为从头开始订阅， *latest* 为仅从最新数据开始订阅，默认为从头订阅。注意，此选项在同一个 `group.id` 中仅生效一次。
- `enable.auto.commit`: 当设置为 `true` 时，将启用自动标记模式，当对数据一致性不敏感时，可以启用此方式。
- `auto.commit.interval.ms`: 自动标记的时间间隔。

#### 完整示例

完整订阅示例参见 [GitHub 示例文件](https://github.com/taosdata/TDengine/blob/3.0/docs/examples/rust/nativeexample/examples/subscribe_demo.rs).

### 与连接池使用

在复杂应用中，建议启用连接池。[taos] 的连接池默认（异步模式）使用 [deadpool] 实现。

如下，可以生成一个默认参数的连接池。

```rust
let pool: Pool<TaosBuilder> = TaosBuilder::from_dsn("taos:///")
    .unwrap()
    .pool()
    .unwrap();
```

同样可以使用连接池的构造器，对连接池参数进行设置：

```rust
let pool: Pool<TaosBuilder> = Pool::builder(Manager::from_dsn(self.dsn.clone()).unwrap().0)
    .max_size(88)  // 最大连接数
    .build()
    .unwrap();
```

在应用代码中，使用 `pool.get()?` 来获取一个连接对象 [Taos]。

```rust
let taos = pool.get()?;
```

### 更多示例程序

示例程序源码位于 `TDengine/examples/rust` 下:

请参考：[rust example](https://github.com/taosdata/TDengine/tree/3.0/examples/rust)

## 常见问题

请参考 [FAQ](../../../train-faq/faq)

## API 参考

[Taos][struct.Taos] 对象提供了多个数据库操作的 API：

1. `exec`: 执行某个非查询类 SQL 语句，例如 `CREATE`，`ALTER`，`INSERT` 等。

    ```rust
    let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
    ```

2. `exec_many`: 同时（顺序）执行多个 SQL 语句。

    ```rust
    taos.exec_many([
        "CREATE DATABASE test",
        "USE test",
        "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
    ]).await?;
    ```

3. `query`：执行查询语句，返回 [ResultSet] 对象。

    ```rust
    let mut q = taos.query("select * from log.logs").await?;
    ```

    [ResultSet] 对象存储了查询结果数据和返回的列的基本信息（列名，类型，长度）：

    列信息使用 [.fields()] 方法获取：

    ```rust
    let cols = q.fields();
    for col in cols {
        println!("name: {}, type: {:?} , bytes: {}", col.name(), col.ty(), col.bytes());
    }
    ```

    逐行获取数据：

    ```rust
    let mut rows = result.rows();
    let mut nrows = 0;
    while let Some(row) = rows.try_next().await? {
        for (col, (name, value)) in row.enumerate() {
            println!(
                "[{}] got value in col {} (named `{:>8}`): {}",
                nrows, col, name, value
            );
        }
        nrows += 1;
    }
    ```

    或使用 [serde](https://serde.rs) 序列化框架。

    ```rust
    #[derive(Debug, Deserialize)]
    struct Record {
        // deserialize timestamp to chrono::DateTime<Local>
        ts: DateTime<Local>,
        // float to f32
        current: Option<f32>,
        // int to i32
        voltage: Option<i32>,
        phase: Option<f32>,
        groupid: i32,
        // binary/varchar to String
        location: String,
    }

    let records: Vec<Record> = taos
        .query("select * from `meters`")
        .await?
        .deserialize()
        .try_collect()
        .await?;
    ```

需要注意的是，需要使用 Rust 异步函数和异步运行时。

[Taos][struct.Taos] 提供部分 SQL 的 Rust 方法化以减少 `format!` 代码块的频率：

- `.describe(table: &str)`: 执行 `DESCRIBE` 并返回一个 Rust 数据结构。
- `.create_database(database: &str)`: 执行 `CREATE DATABASE` 语句。
- `.use_database(database: &str)`: 执行 `USE` 语句。

除此之外，该结构也是参数绑定和行协议接口的入口，使用方法请参考具体的 API 说明。

<p>
<a id="stmt-api" style={{color:'#141414'}}>
参数绑定接口
</a>
</p>

与 C 接口类似，Rust 提供参数绑定接口。首先，通过 [Taos][struct.Taos] 对象创建一个 SQL 语句的参数绑定对象 [Stmt]：

```rust
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
```

参数绑定对象提供了一组接口用于实现参数绑定：

`.set_tbname(name)`

用于绑定表名。

```rust
let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
stmt.set_tbname("d0")?;
```

`.set_tags(&[tag])`

当 SQL 语句使用超级表时，用于绑定子表表名和标签值：

```rust
let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
stmt.set_tbname("d0")?;
stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;
```

`.bind(&[column])`

用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定：

```rust
let params = vec![
    ColumnView::from_millis_timestamp(vec![164000000000]),
    ColumnView::from_bools(vec![true]),
    ColumnView::from_tiny_ints(vec![i8::MAX]),
    ColumnView::from_small_ints(vec![i16::MAX]),
    ColumnView::from_ints(vec![i32::MAX]),
    ColumnView::from_big_ints(vec![i64::MAX]),
    ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
    ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
    ColumnView::from_unsigned_ints(vec![u32::MAX]),
    ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
    ColumnView::from_floats(vec![f32::MAX]),
    ColumnView::from_doubles(vec![f64::MAX]),
    ColumnView::from_varchar(vec!["ABC"]),
    ColumnView::from_nchar(vec!["涛思数据"]),
];
let rows = stmt.bind(&params)?.add_batch()?.execute()?;
```

`.execute()`

执行 SQL。[Stmt] 对象可以复用，在执行后可以重新绑定并执行。执行前请确保所有数据已通过 `.add_batch` 加入到执行队列中。

```rust
stmt.execute()?;

// next bind cycle.
//stmt.set_tbname()?;
//stmt.bind()?;
//stmt.execute()?;
```

一个可运行的示例请见 [GitHub 上的示例](https://github.com/taosdata/taos-connector-rust/blob/main/examples/bind.rs)。


其他相关结构体 API 使用说明请移步 Rust 文档托管网页：<https://docs.rs/taos>。

[taos]: https://github.com/taosdata/rust-connector-taos
[deadpool]: https://crates.io/crates/deadpool
[r2d2]: https://crates.io/crates/r2d2
[TaosBuilder]: https://docs.rs/taos/latest/taos/struct.TaosBuilder.html
[TaosCfg]: https://docs.rs/taos/latest/taos/struct.TaosCfg.html
[struct.Taos]: https://docs.rs/taos/latest/taos/struct.Taos.html
[Stmt]: https://docs.rs/taos/latest/taos/struct.Stmt.html
